In [1]:
import pandas as pd
import numpy as np
import subprocess
import os
import plotly.express as px
import plotly.graph_objects as go

Concept definition

  • file system -- the tested posix file system, including distributed ones, such as glusterfs, and single-host ones, such as xfs.
  • file -- the file written and read on file system.
  • record -- the entity written to file which has a fix length for one file.
  • worker -- a client of file system, technically is a process occupying a core of CPU.
  • combination -- combines arguments of length of record, number of record, files for per worker and number of workers.
  • epoch -- a turn of testing file system with an unique combination.

Aim to archieve

  • evaluate the reliability of file system.
  • find combinations of record length, record number, files, workers for the best whole effiency of host and the best process efficency.

Steps to use this fsXT

  • Before do the following steps, you should build (make) the source, the executive file fsXT will be generated and saved to fsXT/src/fsXT directory where this notebook file is in.
  • Step 1: Please adjust as your wish if the following default values of arguments are inapproporiate.
  • Step 2: Generate test scripts.
  • Step 3: Run test and wait until all epochs finished.
  • Step 4: Collect data from log files of epochs.
  • Step 5: Wrange data.
  • Step 6: Evaluate reliablity by comparing md5 digest with writing and md5 digest with reading for each file.
  • Step 7: Analysize correlationship based on epoch speed, file write speed and file read speed
  • Step 8: Score combinations on epoch speed 70%, file write speed 20%, file read speed 10%, and show top 10.
  • Step 9: Draw I/O graphs of top 10.

Step 1: Please adjust as your wish if the following default values of arguments are inapproporiate.

  • fsXT will invoke executive ttfs many times. Each time is an epoch which start worker processes according to LST_WORKERS.
  • Each worker gets record length, number of records per file, and caculate number of files by TOTAL_LOAD_PER_EPOCH / (WORKERS RECORDLEN RECORDNUM)
In [2]:
WRITE_PATH = "/mnt/ttfs"
READ_PATH = "/mnt1/ttfs"
TOTAL_LOAD_PER_EPOCH = 1024*1024*256         #256MB
LST_WORKERS = [20,24,28,32,36,40,44,48,52,56,60,64]
LST_RECORDLEN = [256,512,768,1024]
LST_RECORDNUM = [4096, 8192, 12288]

Not a step, following is core source code. Please don't modify the following codes unless you really know what you want to do.

In [3]:
class fsXT(object):
    def __init__(self):
        self.epochs = dict()   
        self.len_epochs = 0
        self.pwd = os.getcwd()
        self.testscriptfile = '%s/batch'%self.pwd

    def generate_data(self):
        es = set()
        for arg_p in LST_WORKERS:
            for arg_l in LST_RECORDLEN:
                for arg_n in LST_RECORDNUM:
                    arg_f = max(1, int(TOTAL_LOAD_PER_EPOCH / (arg_p*arg_l*arg_n)))
                    arg_n = max(1, int(TOTAL_LOAD_PER_EPOCH/(arg_p*arg_f*arg_l)))

                    cmdline = "%s/ttfs -w%s -r%s -p%d -l%d -n%d -f%d" % \
                            (self.pwd, WRITE_PATH, READ_PATH, arg_p, arg_l, arg_n, arg_f)
                    es.add(cmdline)

        sf = open(self.testscriptfile,'w')
        arg_e = 1
        for i in es:
            sf.write((i+' -e%d\n'%arg_e))
            arg_e += 1
        sf.close()
        self.len_epochs = len(es)

        print('The test script file was generated and save in %s'%self.testscriptfile)
            


    def run_test(self):
        cmdline = 'chmod +x %s;nohup %s &'%(self.testscriptfile, self.testscriptfile)
        print(cmdline)
        process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
        process.communicate()
        print('NOTICE: THIS IS A TIME CONSUMING JOB. YOU SHOULD WAIT UNTIL ALL EPOCHS FINISHED. PLEASE CHECK ITS STATUS ON HOST.')

        
    def collect_data(self):
        cmdline = \
"""
mkdir -p /tmp/fsXTlog;pushd /tmp/fsXTlog;rm files.log timeticks.log epochs.log -f;mv ../ttfslog.???? ./ -f;
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^1,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> files.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^2,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> timeticks.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^3,"|cut -f1 -d"," --complement >> epochs.log;done
"""%(self.len_epochs, self.len_epochs, self.len_epochs)
        process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
        process.communicate()
        print('Finished collecting data from /tmp/ttfslog.*, and save data to /tmp/fsXTlog/files.log, timeticks.log and epochs.log ')
        
        
    def wrangle_data(self):        
        self.df_epochs = pd.read_csv("/tmp/fsXTlog/epochs.log", header=None)
        self.df_epochs.columns = ['epoch', 'workers', 'record_length', 'record_number', 'files', 'start_timestamp', 'end_timestamp']
        self.df_files = pd.read_csv("/tmp/fsXTlog/files.log", header=None)
        self.df_files.columns = ['epoch', 'worker_id', 'filename','record_length', 'record_number', 'md5_write', 'md5_read', 
                                 'tlen_open','tlen_close','tlen_mv','tlen_write', 'tlen_read']
        self.df_timeticks = pd.read_csv("/tmp/fsXTlog/timeticks.log", header=None)
        self.df_timeticks.columns = ['epoch', 'worker_id', 'checkpoint','unit_wbytes', 'unit_rbytes', 'elapsed_time']
        self.df_timeticks['unit_wbytes'] = self.df_timeticks['unit_wbytes']/1024**2
        self.df_timeticks['unit_rbytes'] = self.df_timeticks['unit_rbytes']/1024**2
    
        self.df_epochs['total_files'] = self.df_epochs['files']*self.df_epochs['workers']
        self.df_epochs['file_size'] = self.df_epochs['record_length']*self.df_epochs['record_number']
        self.df_epochs['duration'] = self.df_epochs['end_timestamp'] -  self.df_epochs['start_timestamp']
        self.df_epochs['total_load'] = self.df_epochs['total_files']*self.df_epochs['file_size']
        self.df_epochs['e_speed'] = self.df_epochs['total_load']/self.df_epochs['duration']/1024**2
        self.df_epochs.index = self.df_epochs['epoch']
        self.df_epochs.index.name = ""
        
        self.df_files['tlen_file'] = self.df_files['tlen_open']+self.df_files['tlen_close'] \
                +self.df_files['tlen_mv']+self.df_files['tlen_write']+self.df_files['tlen_read']
        
        self.df_files.drop(columns = ['record_length', 'record_number'], inplace=True)
        self.df_files = pd.merge(self.df_files, self.df_epochs, how='left', left_on=['epoch'], right_on = ['epoch'])
        self.df_files['fw_speed'] = self.df_files['file_size']/self.df_files['tlen_write']/1024**2
        self.df_files['fr_speed'] = self.df_files['file_size']/self.df_files['tlen_read']/1024**2
        
        df_score = xt.df_files[['e_speed','fw_speed','fr_speed']].agg(['min','max'])
        df_score = df_score.append(pd.Series(df_score.loc['max'] - df_score.loc['min'], name='range'))

        df_score_rst = self.df_files[['epoch','e_speed','fw_speed', 'fr_speed']].groupby(['epoch']).mean()
        df_score_rst['epoch_score'] = (df_score_rst['e_speed'] - df_score.loc['min','e_speed'])/df_score.loc['range','e_speed']*100.0
        df_score_rst['fw_score'] = (df_score_rst['fw_speed'] - df_score.loc['min','fw_speed'])/df_score.loc['range','fw_speed']*100.0
        df_score_rst['fr_score'] = (df_score_rst['fr_speed'] - df_score.loc['min','fr_speed'])/df_score.loc['range','fr_speed']*100.0
        df_score_rst['score'] = df_score_rst['epoch_score']*0.7+df_score_rst['fw_score']*0.2+df_score_rst['fr_score']*0.1
        #xt.df_files.sort_values(['fw_speed'], ascending=False)[xt.df_files.columns[10:]]
        self.df_epochs = pd.merge(self.df_epochs, df_score_rst[['fw_speed', 'fr_speed','epoch_score','fw_score','fr_score','score']], \
                 how='inner', left_index=True, right_index=True)
        print('Finished wrangling data.')
        

    def eval_reliability(self):
        df_nonreliability = self.df_files.query('md5_write!=md5_read')
        if len(df_nonreliability) > 0:
            print('!!!!!!! Fail to pass Reliablity Test !!!!!!!')
            print('------The following files have different digests of writing and reading:-----')
            print(self.df_files)
        else:
            print('### Success to pass Reliablity Test ###')
            
    def analysize_correlationship(self):
        tmpdf = self.df_epochs[['workers', 'record_length', 'record_number', 'files', \
                            'total_files', 'file_size', 'e_speed']]
        self.__analysize_correlationship(tmpdf, 'e_speed', 'EPOCHS PERFORMACE')
        
        tmpdf = self.df_files[['workers', 'record_length', 'record_number', \
                            'file_size', 'files','total_files', 'fw_speed', 'fr_speed']]
        self.__analysize_correlationship(tmpdf, 'fw_speed', 'WORKER PERFORMANCE')
        self.__analysize_correlationship(tmpdf, 'fr_speed', 'WORKER PERFORMANCE')
        

    def __analysize_correlationship(self, df, basic_col, df_label):
        sr_corr = df.corr()[basic_col]
        sr_corr.dropna(inplace=True)
        sr_corr.drop(labels=[basic_col], inplace=True)

        order = sr_corr.abs().sort_values(ascending = False)
        
        print('### Analysizing result of %s based on "%s" ###' % (df_label, basic_col)  )
        print(sr_corr[order.index])
        #print('--- the most factor influencing "%s" is "%s"'%(basic_col,order.index[0]))
            

Step 2: Generate test scripts.

In [4]:
xt = fsXT()
xt.generate_data()
The test script file was generated and save in /root/codesdir/fsXT/src/fsXT/batch

Step 3: Run test and wait until all epochs finished. NOTICE -- THIS IS A TIME CONSUMING JOB.

In [ ]:
xt.run_test()

Step 4: Collect data from log files of epochs.

In [5]:
xt.collect_data()
Finished collecting data from /tmp/ttfslog.*, and save data to /tmp/fsXTlog/files.log, timeticks.log and epochs.log 

Step 5: Wrange data.

In [6]:
xt.wrangle_data()
Finished wrangling data.

Step 6: Evaluate reliablity by comparing md5 digest with writing and md5 digest with reading for each file.

In [7]:
xt.eval_reliability()
### Success to pass Reliablity Test ###

Step 7: Analysize correlationship based on epoch speed, file write speed and file read speed

In [8]:
xt.analysize_correlationship()
### Analysizing result of EPOCHS PERFORMACE based on "e_speed" ###
record_length    0.895476
total_files     -0.487757
file_size        0.484239
files           -0.345987
record_number   -0.255739
workers         -0.223157
Name: e_speed, dtype: float64
### Analysizing result of WORKER PERFORMANCE based on "fw_speed" ###
record_length    0.710429
workers         -0.507266
file_size        0.479296
total_files     -0.415885
fr_speed        -0.180023
files           -0.118743
record_number   -0.093798
Name: fw_speed, dtype: float64
### Analysizing result of WORKER PERFORMANCE based on "fr_speed" ###
file_size       -0.492793
total_files      0.465953
record_length   -0.374628
files            0.361422
record_number   -0.299410
fw_speed        -0.180023
workers          0.009179
Name: fr_speed, dtype: float64

Step 8: Score combinations on epoch speed 70%, file write speed 20%, file read speed 10%, and show top 10.

In [9]:
xt.df_epochs.sort_values(['score'],ascending=False)[:10]\
    [['epoch','workers','record_length','record_number',\
      'duration','total_files','e_speed', 'fw_speed', 'fr_speed',\
       'epoch_score', 'fw_score', 'fr_score', 'score']]
Out[9]:
epoch workers record_length record_number duration total_files e_speed fw_speed fr_speed epoch_score fw_score fr_score score
38 38 32 1024 8192 15.599013 32 16.411295 0.561827 40.823830 100.000000 49.095928 15.530386 81.372224
51 51 28 1024 4681 16.220592 56 15.781926 0.667206 20.668783 95.822399 58.643508 6.838554 79.488236
46 46 32 1024 4096 21.477083 64 11.919682 0.532969 44.664500 70.185719 46.481380 17.186668 60.144946
56 56 56 1024 4681 20.630908 56 12.408188 0.236016 15.104839 73.428309 19.576775 4.439112 55.759083
18 18 36 1024 7281 25.213650 36 10.152146 0.298857 12.316745 58.453226 25.270282 3.236751 46.294990
80 80 20 1024 4369 28.315550 60 9.040831 0.514088 38.568995 51.076574 44.770669 14.557992 46.163534
63 63 48 1024 5461 25.882375 48 9.890297 0.217578 12.475834 56.715133 17.906229 3.305358 43.612375
86 86 28 1024 9362 27.863066 28 9.187510 0.356923 10.574126 52.050194 30.531201 2.485249 42.789901
11 11 64 1024 4096 26.724644 64 9.579173 0.156957 14.323673 54.649968 12.413877 4.102235 41.147976
83 83 24 1024 5461 30.928032 48 8.276775 0.372638 32.290110 46.004952 31.955020 11.850232 39.779494

Step 9: Draw I/O graphs of top 10.

In [10]:
#from plotly.subplots import make_subplots
#fig = make_subplots(rows=5, cols=2)

i = 1
for e in xt.df_epochs.sort_values(['score'],ascending=False)[:10]['epoch']:
    tmpdf = xt.df_timeticks.query('epoch==%d'%e)

    fig = go.Figure()
    trace_write = go.Histogram(histfunc="sum", y=tmpdf['unit_wbytes'], x=tmpdf['elapsed_time'], 
                               name="Epoch%d_writing"%e, xbins=go.histogram.XBins(start=0, size=1))

    trace_read = go.Histogram(histfunc="sum", y=tmpdf['unit_rbytes'], x=tmpdf['elapsed_time'], 
                               name="Epoch%d_reading"%e, xbins=go.histogram.XBins(start=0, size=1))

    fig.add_trace(trace_write)
    fig.add_trace(trace_read)

    es = xt.df_epochs.loc[e]
    fig.update_layout(barmode="stack",bargap=0.1, 
                      xaxis_title='Elapsed time (seconds)',
                      yaxis_title='MBytes')
    print('Rank: %d\n'\
        'Epoch: %d, Spent time: %.2f seconds,\n'\
        'Record length: %d, Records: %d,\n'
        'Workers: %d, Total files: %d'%
        (i,e, es['duration'], es['record_length'],\
        es['record_number'], es['workers'], es['total_files']))

    i += 1
    fig.show()
Rank: 1
Epoch: 38, Spent time: 15.60 seconds,
Record length: 1024, Records: 8192,
Workers: 32, Total files: 32
Rank: 2
Epoch: 51, Spent time: 16.22 seconds,
Record length: 1024, Records: 4681,
Workers: 28, Total files: 56
Rank: 3
Epoch: 46, Spent time: 21.48 seconds,
Record length: 1024, Records: 4096,
Workers: 32, Total files: 64
Rank: 4
Epoch: 56, Spent time: 20.63 seconds,
Record length: 1024, Records: 4681,
Workers: 56, Total files: 56
Rank: 5
Epoch: 18, Spent time: 25.21 seconds,
Record length: 1024, Records: 7281,
Workers: 36, Total files: 36
Rank: 6
Epoch: 80, Spent time: 28.32 seconds,
Record length: 1024, Records: 4369,
Workers: 20, Total files: 60
Rank: 7
Epoch: 63, Spent time: 25.88 seconds,
Record length: 1024, Records: 5461,
Workers: 48, Total files: 48
Rank: 8
Epoch: 86, Spent time: 27.86 seconds,
Record length: 1024, Records: 9362,
Workers: 28, Total files: 28
Rank: 9
Epoch: 11, Spent time: 26.72 seconds,
Record length: 1024, Records: 4096,
Workers: 64, Total files: 64
Rank: 10
Epoch: 83, Spent time: 30.93 seconds,
Record length: 1024, Records: 5461,
Workers: 24, Total files: 48
In [ ]: